package me.seu.demo.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * 定义了三个消费者:
 * 1、同组内会均分同主题下分区的消息
 * 2、不同组均会收到主体下全量数据
 *
 * @author liangfeihu
 * @since 2020/3/9 12:08
 */
@Slf4j
@Component
public class KafkaSendToConsumer {

    /**
     * topic-kl队列消息加工后转移至topic-ckl队列
     *
     * @param input
     * @return
     */
    @KafkaListener(id = "webGroup", topics = "topic-kl")
    @SendTo("topic-ckl")
    public String listen(String input) {
        String name = Thread.currentThread().getName();
        log.info("[{}]receive input value: {}", name, input);
        return input + " hello!";
    }

    @KafkaListener(id = "webGroup1", topics = "topic-ckl")
    public void listen2(String input) {
        String name = Thread.currentThread().getName();
        log.info("[{}]receive input value: {}", name, input);
    }

    /**
     * 处理失败次数超过限制将消息转移至死信队列
     *
     * @param input
     * @return
     */
    @KafkaListener(id = "webGroup2", topics = "topic-kln")
    public String listen3(String input) {
        log.info("input value: {}", input);
        throw new RuntimeException("dlt");
    }

    @KafkaListener(id = "dltGroup", topics = "topic-kln.DLT")
    public void dltListen(String input) {
        log.info("Received from DLT: " + input);
    }

}
